package com.bawei.persona.realtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import com.bawei.persona.realtime.app.func.MyHBaseSinkFunctionNew;
import com.bawei.persona.realtime.app.func.TableProcessHoKFunction;
import com.bawei.persona.realtime.bean.TableProcess;

import com.bawei.persona.realtime.util.MyKafkaUtil;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;

/**
 * 项目规划及管理
 * 上海大数据学院院长 ：孙丰朝
 * 技术指导及需求分析：郭洵
 * 编程：楚志高
 *
 * @author bawei  bigdata sh
 * @since 2021-06-11
 */

public class BaseDBAppPersona {



    public static void main(String[] args) throws Exception {
        //获取原始的数据流
        //从配置文件中获取需要的往kafka分流的数据
        //从配置中获取需要往数据库中写的维度数据
        //TODO 1.准备环境
        //1.1 创建流处理执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并新度
        env.setParallelism(1);
        //1.3 开启Checkpoint，并设置相关的参数
        //env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        //env.getCheckpointConfig().setCheckpointTimeout(60000);
        //env.setStateBackend(new FsStateBackend("hdfs://hadoop202:8020/gmall/checkpoint/basedbapp"));
        //重启策略
        // 如果说没有开启重启Checkpoint，那么重启策略就是noRestart
        // 如果说没有开Checkpoint，那么重启策略会尝试自动帮你进行重启   重启Integer.MaxValue
        //env.setRestartStrategy(RestartStrategies.noRestart());

        //TODO 2.从Kafka的ODS层读取数据
        String topic = "ods_base_db_m";
        String groupId = "base_db_app_group";
        //2.1 通过工具类获取Kafka的消费者
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
        DataStreamSource<String> jsonStrDS = env.addSource(kafkaSource);
        //TODO 3.对DS中数据进行结构的转换      String-->Json
        jsonStrDS.map(JSON::parseObject);
        SingleOutputStreamOperator<JSONObject> jsonObjDS = jsonStrDS.map(jsonStr -> JSON.parseObject(jsonStr));
        jsonStrDS.print("json>>>>");
        //TODO 4.对数据进行ETL   如果table为空 或者 data为空，或者长度<3  ，将这样的数据过滤掉
        SingleOutputStreamOperator<JSONObject> filteredDS = jsonObjDS.filter(
                jsonObj -> {
                    boolean flag = jsonObj.getString("table") != null
                            && jsonObj.getJSONObject("data") != null
                            && jsonObj.getString("data").length() > 3;
                    return flag;
                }
        );

        //TODO 5. 动态分流  事实表放到主流，写回到kafka的DWD层；如果维度表，通过侧输出流，写入到Hbase
        //5.1定义输出到Hbase的侧输出流标签
        OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>(TableProcess.SINK_TYPE_HBASE){};

        //主流输出到kafka
        SingleOutputStreamOperator<JSONObject> process = filteredDS.process(new TableProcessHoKFunction(hbaseTag));
        //测输出流将来输出到hbase中去
        DataStream<JSONObject> sideOutput = process.getSideOutput(hbaseTag);

        //测输出流的数据直接写入hbase中的表中，
        sideOutput.addSink(new MyHBaseSinkFunctionNew()) ;
        // 根据传输的数据比较将数据分流 ，如果成功为True
        FlinkKafkaProducer<JSONObject> kafkaSinkBySchema = MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
            @Override
            public void open(SerializationSchema.InitializationContext context) throws Exception {
                System.out.println("虚拟化kafkatopic 数据");
            }

            @Override
            public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long aLong) {

                String sinkTopic = jsonObj.getString("sink_table");
                JSONObject data = jsonObj.getJSONObject("data");
                return new ProducerRecord<>(sinkTopic, data.toString().getBytes());
            }
        });
         //将kafka 数据进行分流到dwd 去
        process.addSink(kafkaSinkBySchema)  ;
        env.execute();
    }
}
